In [ ]:
# %%bash

# pip install tensorflow==1.7
# pip install tensorflow-transform

Text Classification using TensorFlow and Google Cloud - Part 3

This bigquery-public-data:hacker_news contains all stories and comments from Hacker News from its launch in 2006. Each story contains a story id, url, the title of the story, tthe author that made the post, when it was written, and the number of points the story received.

The objective is, given the title of the story, we want to build an ML model that can predict the source of this story.

TF Custom Estimator Word Embedding for Text Classification

This notebook illustrates how to build a TF Custom Estimator for Text Classification. The model will make use of the 'bow' feature in the transformed dataset as the input layer. The 'bow' feature is a sparce vector of integers representing the indecies of the words in the text (title). The model will also make use of the "vocabolary" file produced during the tf.transform pipeline as a lookup for word index.

  1. Define the metadata
  2. Define data input function
  3. Create custom estimator with model_fn
  4. Setup experiement
    • Hyper-parameters & RunConfig
    • Serving function (for exported model)
    • TrainSpec & EvalSpec
  5. Run experiement
  6. Evalute the model
  7. Use SavedModel for prediction

Setting Global Parameters


In [1]:
import os

class Params:
    pass

# Set to run on GCP
Params.GCP_PROJECT_ID = 'ksalama-gcp-playground'
Params.REGION = 'europe-west1'
Params.BUCKET = 'ksalama-gcs-cloudml'

Params.PLATFORM = 'local' # local | GCP

Params.DATA_DIR = 'data/news'  if Params.PLATFORM == 'local' else 'gs://{}/data/news'.format(Params.BUCKET)

Params.TRANSFORMED_DATA_DIR = os.path.join(Params.DATA_DIR, 'transformed')
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'train')
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'eval')

Params.TEMP_DIR = os.path.join(Params.DATA_DIR, 'tmp')

Params.MODELS_DIR = 'models/news' if Params.PLATFORM == 'local' else 'gs://{}/models/news'.format(Params.BUCKET)

Params.TRANSFORM_ARTEFACTS_DIR = os.path.join(Params.MODELS_DIR,'transform')

Params.TRAIN = True

Params.RESUME_TRAINING = False

Params.EAGER = False

if Params.EAGER:
    tf.enable_eager_execution()

Importing libraries


In [2]:
import tensorflow as tf
from tensorflow import data


from tensorflow.contrib.learn.python.learn.utils import input_fn_utils
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.saved import saved_transform_io

print tf.__version__


WARNING:tensorflow:From /Users/khalidsalama/Technology/python-venvs/py27-venv/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/datasets/base.py:198: retry (from tensorflow.contrib.learn.python.learn.datasets.base) is deprecated and will be removed in a future version.
Instructions for updating:
Use the retry module or similar alternatives.
1.7.0

1. Define Metadata


In [3]:
RAW_HEADER = 'key,title,source'.split(',')
RAW_DEFAULTS = [['NA'],['NA'],['NA']]
TARGET_FEATURE_NAME = 'source'
TARGET_LABELS = ['github', 'nytimes', 'techcrunch']
TEXT_FEATURE_NAME = 'title'
KEY_COLUMN = 'key'

VOCAB_SIZE = 20000
TRAIN_SIZE = 73124
EVAL_SIZE = 23079

PAD_VALUE = VOCAB_SIZE + 1
VOCAB_LIST_FILE = os.path.join(Params.TRANSFORM_ARTEFACTS_DIR, 'transform_fn/assets/vocab_string_to_int_uniques')
MAX_WORDS_PER_TITLE = 10


raw_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
    KEY_COLUMN: dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation()),
    TEXT_FEATURE_NAME: dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation()),
    TARGET_FEATURE_NAME: dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation()),
}))


transformed_metadata = metadata_io.read_metadata(
    os.path.join(Params.TRANSFORM_ARTEFACTS_DIR,"transformed_metadata"))

raw_feature_spec = raw_metadata.schema.as_feature_spec()
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()

print transformed_feature_spec


{u'source': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), u'title': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), u'weight': VarLenFeature(dtype=tf.float32), u'bow': VarLenFeature(dtype=tf.int64)}

2. Define Input Function


In [4]:
def parse_tf_example(tf_example):
    
    parsed_features = tf.parse_single_example(serialized=tf_example, features=transformed_feature_spec)
    target = parsed_features.pop(TARGET_FEATURE_NAME)
    
    return parsed_features, target


def generate_tfrecords_input_fn(files_pattern, 
                          mode=tf.estimator.ModeKeys.EVAL, 
                          num_epochs=1, 
                          batch_size=200):
    
    def _input_fn():
        
        file_names = data.Dataset.list_files(files_pattern)

        if Params.EAGER:
            print file_names

        dataset = data.TFRecordDataset(file_names )

        dataset = dataset.apply(
                tf.contrib.data.shuffle_and_repeat(count=num_epochs,
                                                   buffer_size=batch_size*2)
        )

        dataset = dataset.apply(
                tf.contrib.data.map_and_batch(parse_tf_example, 
                                              batch_size=batch_size, 
                                              num_parallel_batches=2)
        )

        datset = dataset.prefetch(batch_size)

        if Params.EAGER:
            return dataset

        iterator = dataset.make_one_shot_iterator()
        features, target = iterator.get_next()
        return features, target
    
    return _input_fn

3. Create Custom Estimator using Model Function

3.1 Define model_fn


In [5]:
def _bow_to_vector(sparse_bow_indecies):
    
    # Convert sparse tensor to dense tensor by padding each entry to match the longest in the batch
    bow_indecies = tf.sparse_tensor_to_dense(sparse_bow_indecies, default_value=PAD_VALUE)
    
    # Create a word_ids padding
    padding = tf.constant([[0,0],[0, MAX_WORDS_PER_TITLE]])
    
    # Pad all the word_ids entries to the maximum document length
    bow_indecies_padded = tf.pad(bow_indecies, padding)
    
    bow_vector = tf.slice(bow_indecies_padded, [0,0], [-1, MAX_WORDS_PER_TITLE])
    
    # Return the final word_id_vector
    return bow_vector

def _shallow_layers(features, params):
    
        # word_id_vector
        bow_vector = _bow_to_vector(features['bow']) 

        # layer to take each word_id and convert it into vector (embeddings) 
        word_embeddings = tf.contrib.layers.embed_sequence(bow_vector, 
                                                           vocab_size=VOCAB_SIZE+2, 
                                                           embed_dim=params.embedding_size)
    
        ### CNN Model ############################################################################
        if params.model_type == 'CNN':
    
            words_conv = tf.layers.conv1d(word_embeddings, 
                                          filters=params.filters, 
                                          kernel_size=params.window_size, 
                                          strides=int(window_size/2), 
                                          padding='SAME', activation=tf.nn.relu)

            words_conv_shape = words_conv.get_shape()
            dim = words_conv_shape[1] * words_conv_shape[2]
            shallow_layer = tf.reshape(words_conv,[-1, dim])
        
        
        ### LSTM Model ############################################################################
        elif params.model_type == 'LSTM':
            
            rnn_layers = [tf.nn.rnn_cell.LSTMCell(
                num_units=size, 
                forget_bias=params.forget_bias,
                activation=tf.nn.tanh) for size in params.hidden_units]

            # create a RNN cell composed sequentially of a number of RNNCells
            multi_rnn_cell = tf.nn.rnn_cell.MultiRNNCell(rnn_layers)
    
            shallow_layer = tf.unstack(word_embeddings, axis=1)
        
        ### MAX MIN Embedding Model ############################################################################
        
        else:
            
            # Repesent the doc embedding as the concatenation of MIN and MAX of the word embeddings
            doc_embeddings_min = tf.reduce_min(word_embeddings, axis=1)
            doc_embeddings_max = tf.reduce_max(word_embeddings, axis=1)
            shallow_layer = tf.concat([doc_embeddings_min, doc_embeddings_max], axis=1)
        
        return shallow_layer
    

def _fully_connected_layers(inputs, params):
    
        hidden_layers = inputs
        
        if params.hidden_units is not None:

            # Create a fully-connected layer-stack based on the hidden_units in the params
            hidden_layers = tf.contrib.layers.stack(
                inputs=inputs,
                layer=tf.contrib.layers.fully_connected,
                stack_args= params.hidden_units,
                activation_fn=tf.nn.relu)
        
        return hidden_layers
            
        
    

def model_fn(features, labels, mode, params): 

    # Create the input layers via CNN, LSTM, or MAX+MIN embeddings
    shallow_layers_output = _shallow_layers(features, params)
    
    # Create FCN using hidden units
    hidden_layers = _fully_connected_layers(shallow_layers_output, params)
    
    # Number of classes
    output_layer_size = len(TARGET_LABELS)
    
    # Connect the output layer (logits) to the hidden layer (no activation fn)
    logits = tf.layers.dense(inputs=hidden_layers, 
                             units=output_layer_size, 
                             activation=None)
    

    head = tf.contrib.estimator.multi_class_head(
        n_classes=len(TARGET_LABELS),
        label_vocabulary=TARGET_LABELS,
        name='classification_head'
    )
    
    
    def _train_op_fn(loss):

        # Create Optimiser
        optimizer = tf.train.AdamOptimizer(
            learning_rate=params.learning_rate)

        # Create training operation
        train_op = optimizer.minimize(
            loss=loss, global_step=tf.train.get_global_step())

        return train_op

    
    return head.create_estimator_spec(
        features,
        mode,
        logits,
        labels=labels,
        train_op_fn=_train_op_fn
    )

3.2 Create Custom Estimator using model_fn


In [6]:
def create_estimator(hparams, run_config):
    
    estimator = tf.estimator.Estimator(model_fn=model_fn, 
                                  params=hparams, 
                                  config=run_config)

    return estimator

4. Setup Experiment

4.1 HParams and RunConfig


In [7]:
NUM_EPOCHS = 10
BATCH_SIZE = 1000

TOTAL_STEPS = (TRAIN_SIZE/BATCH_SIZE)*NUM_EPOCHS
EVAL_EVERY_SEC = 60

hparams  = tf.contrib.training.HParams(
    num_epochs=NUM_EPOCHS,
    batch_size=BATCH_SIZE,
    embedding_size = 50, # word embedding vector size
    learning_rate=0.01,
    hidden_units=[64, 32],
    max_steps=TOTAL_STEPS,
    model_type='MAXMIN_EMBEDDING', # CNN | LSTM | MAXMIN_EMBEDDING

    #CNN Params
    window_size = 3,
    filters = 2,
    
    #LSTM Params
    forget_bias=1.0,
    keep_prob = 0.8,

)

MODEL_NAME = 'dnn_estimator_custom' 
model_dir = os.path.join(Params.MODELS_DIR, MODEL_NAME)

run_config = tf.estimator.RunConfig(
    tf_random_seed=19830610,
    log_step_count_steps=1000,
    save_checkpoints_secs=EVAL_EVERY_SEC,
    keep_checkpoint_max=1,
    model_dir=model_dir
)


print(hparams)
print("")
print("Model Directory:", run_config.model_dir)
print("Dataset Size:", TRAIN_SIZE)
print("Batch Size:", BATCH_SIZE)
print("Steps per Epoch:",TRAIN_SIZE/BATCH_SIZE)
print("Total Steps:", TOTAL_STEPS)


[('batch_size', 1000), ('embedding_size', 50), ('hidden_units', [64, 32]), ('learning_rate', 0.01), ('max_steps', 730), ('model_type', 'MAXMIN_EMBEDDING'), ('num_epochs', 10), ('trainable_embedding', False)]

('Model Directory:', 'models/news/dnn_estimator_custom')
('Dataset Size:', 73124)
('Batch Size:', 1000)
('Steps per Epoch:', 73)
('Total Steps:', 730)

4.2 Serving function


In [8]:
def generate_serving_input_fn():
    
    def _serving_fn():
    
        receiver_tensor = {
          'title': tf.placeholder(dtype=tf.string, shape=[None])
        }

        _, transformed_features = (
            saved_transform_io.partially_apply_saved_transform(
                os.path.join(Params.TRANSFORM_ARTEFACTS_DIR, transform_fn_io.TRANSFORM_FN_DIR),
            receiver_tensor)
        )
        
        return tf.estimator.export.ServingInputReceiver(
            transformed_features, receiver_tensor)
    
    return _serving_fn

4.3 TrainSpec & EvalSpec


In [9]:
train_spec = tf.estimator.TrainSpec(
    input_fn = generate_tfrecords_input_fn(
        Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX+"*",
        mode = tf.estimator.ModeKeys.TRAIN,
        num_epochs=hparams.num_epochs,
        batch_size=hparams.batch_size
    ),
    max_steps=hparams.max_steps,
    hooks=None
)

eval_spec = tf.estimator.EvalSpec(
    input_fn = generate_tfrecords_input_fn(
        Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX+"*",
        mode=tf.estimator.ModeKeys.EVAL,
        num_epochs=1,
        batch_size=hparams.batch_size
    ),
    exporters=[tf.estimator.LatestExporter(
        name="estimate", # the name of the folder in which the model will be exported to under export
        serving_input_receiver_fn=generate_serving_input_fn(),
        exports_to_keep=1,
        as_text=False)],
    steps=None,
    throttle_secs=EVAL_EVERY_SEC
)

5. Run experiment


In [10]:
from datetime import datetime
import shutil

if Params.TRAIN:
    if not Params.RESUME_TRAINING:
        print("Removing previous training artefacts...")
        shutil.rmtree(model_dir, ignore_errors=True)
    else:
        print("Resuming training...") 


    tf.logging.set_verbosity(tf.logging.INFO)

    time_start = datetime.utcnow() 
    print("Experiment started at {}".format(time_start.strftime("%H:%M:%S")))
    print(".......................................") 

    estimator = create_estimator(hparams, run_config)

    tf.estimator.train_and_evaluate(
        estimator=estimator,
        train_spec=train_spec, 
        eval_spec=eval_spec
    )

    time_end = datetime.utcnow() 
    print(".......................................")
    print("Experiment finished at {}".format(time_end.strftime("%H:%M:%S")))
    print("")
    time_elapsed = time_end - time_start
    print("Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds()))
else:
    print "Training was skipped!"


Removing previous training artefacts...
Experiment started at 13:54:37
.......................................
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 60, '_session_config': None, '_keep_checkpoint_max': 1, '_tf_random_seed': 19830610, '_task_type': 'worker', '_global_id_in_cluster': 0, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x11fb639d0>, '_model_dir': 'models/news/dnn_estimator_custom', '_num_worker_replicas': 1, '_task_id': 0, '_log_step_count_steps': 1000, '_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_evaluation_master': '', '_service': None, '_save_summary_steps': 100, '_num_ps_replicas': 0}
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 60 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1 into models/news/dnn_estimator_custom/model.ckpt.
INFO:tensorflow:loss = 1098.9199, step = 1
INFO:tensorflow:loss = 288.47186, step = 101 (5.831 sec)
INFO:tensorflow:loss = 210.72809, step = 201 (5.488 sec)
INFO:tensorflow:loss = 135.61783, step = 301 (5.683 sec)
INFO:tensorflow:loss = 64.686, step = 401 (4.954 sec)
INFO:tensorflow:loss = 51.20141, step = 501 (4.932 sec)
INFO:tensorflow:loss = 11.550993, step = 601 (4.965 sec)
INFO:tensorflow:loss = 19.073622, step = 701 (5.968 sec)
INFO:tensorflow:Saving checkpoints for 730 into models/news/dnn_estimator_custom/model.ckpt.
INFO:tensorflow:Loss for final step: 24.172638.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2018-05-14-13:55:19
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from models/news/dnn_estimator_custom/model.ckpt-730
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Finished evaluation at 2018-05-14-13:55:19
INFO:tensorflow:Saving dict for global step 730: accuracy/classification_head = 0.81940293, average_loss/classification_head = 0.9810729, global_step = 730, loss = 943.42426
WARNING:tensorflow:Expected binary or unicode string, got type_url: "type.googleapis.com/tensorflow.AssetFileDef"
value: "\n\t\n\007Const:0\022\033vocab_string_to_int_uniques"

INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Signatures INCLUDED in export for Classify: ['serving_default', 'classification']
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['predict']
INFO:tensorflow:Restoring parameters from models/news/dnn_estimator_custom/model.ckpt-730
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:Assets written to: models/news/dnn_estimator_custom/export/estimate/temp-1526306120/assets
INFO:tensorflow:SavedModel written to: models/news/dnn_estimator_custom/export/estimate/temp-1526306120/saved_model.pb
.......................................
Experiment finished at 13:55:20

Experiment elapsed time: 42.53896 seconds

7. Evaluate the model


In [11]:
tf.logging.set_verbosity(tf.logging.ERROR)

estimator = create_estimator(hparams, run_config)

train_metrics = estimator.evaluate(
    input_fn = generate_tfrecords_input_fn(
        files_pattern= Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX+"*", 
        mode= tf.estimator.ModeKeys.EVAL,
        batch_size= TRAIN_SIZE), 
    steps=1
)


print("############################################################################################")
print("# Train Measures: {}".format(train_metrics))
print("############################################################################################")

eval_metrics = estimator.evaluate(
    input_fn=generate_tfrecords_input_fn(
        files_pattern= Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX+"*", 
        mode= tf.estimator.ModeKeys.EVAL,
        batch_size= EVAL_SIZE), 
    steps=1
)
print("")
print("############################################################################################")
print("# Eval Measures: {}".format(eval_metrics))
print("############################################################################################")


############################################################################################
# Train Measures: {'average_loss/classification_head': 0.014071254, 'accuracy/classification_head': 0.99587005, 'global_step': 730, 'loss': 1028.9464}
############################################################################################

############################################################################################
# Eval Measures: {'average_loss/classification_head': 0.9810715, 'accuracy/classification_head': 0.81940293, 'global_step': 730, 'loss': 22642.148}
############################################################################################

8. Use Saved Model for Predictions


In [12]:
import os

export_dir = model_dir +"/export/estimate/"
saved_model_dir = os.path.join(export_dir, os.listdir(export_dir)[0])

print(saved_model_dir)
print("")

predictor_fn = tf.contrib.predictor.from_saved_model(
    export_dir = saved_model_dir,
    signature_def_key="predict"
)

output = predictor_fn(
    {
        'title':[
            'Microsoft and Google are joining forces for a new AI framework',
            'A new version of Python is mind blowing',
            'EU is investigating new data privacy policies'
        ]
        
    }
)
print(output)


models/news/dnn_estimator_custom/export/estimate/1526306120

{u'probabilities': array([[5.2824073e-02, 9.4629961e-01, 8.7631273e-04],
       [1.4882383e-05, 9.9805754e-01, 1.9275737e-03],
       [1.5370923e-04, 9.9855918e-01, 1.2871033e-03]], dtype=float32), u'class_ids': array([[1],
       [1],
       [1]]), u'classes': array([['nytimes'],
       ['nytimes'],
       ['nytimes']], dtype=object), u'logits': array([[-1.4969722,  1.38862  , -5.5959716],
       [-7.0172634,  4.096124 , -2.1534247],
       [-5.1254215,  3.6535845, -3.0003347]], dtype=float32)}

In [ ]: